// Copyright 2021 by 马万里. All rights reserved.
// 开发团队 ：   鸡中之霸
// 开发人员 ：   马万里
// 开发时间 ：   2021/3/1 16:46
// 文件名称 ：   rabbitmq.go
// 工程名称 ：   seckill
// 开发工具 ：   GoLand
//

package rabbitmq

import (
	"fmt"
	"github.com/streadway/amqp"
	"log"
)

const (
	// amqp:账号:密码@服务器地址:端口号/vhost
	MQURL = "amqp://seckill:seckill@johnsonsmile.cn:5672/seckill"
)

type RabbitMQ struct {
	conn    *amqp.Connection
	channel *amqp.Channel
	// 队列名称
	QueueName string
	// 交换机
	Exchange string
	// key
	Key string
	// 连接信息
	MqUrl string
}

// NewRabbitMQ: 创建结构体实例
func NewRabbitMQ(queueName string, exchange string, key string) *RabbitMQ {
	var err error
	rabbitmq := new(RabbitMQ)
	rabbitmq.QueueName = queueName
	rabbitmq.Exchange = exchange
	rabbitmq.Key = key
	rabbitmq.MqUrl = MQURL
	rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
	rabbitmq.failOnErr(err, "创建连接错误!")
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "获取channel失败")
	return rabbitmq
}

// Destory: 断开channel和connection
func (r RabbitMQ) Destory() {
	r.channel.Close()
	r.conn.Close()
}

// failOnErr: 错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
	if err != nil {
		log.Fatalf("%s:%s\n", message, err.Error())
		panic(fmt.Sprintf("%s:%s\n", message, err.Error()))
	}
}

// 简单模式: rabbitMQ NewRabbitMQSimple: 获取simpleMQ
func NewRabbitMQSimple(queueName string) *RabbitMQ {
	return NewRabbitMQ(queueName, "", "")
}

// 简单模式: 生成者
func (r *RabbitMQ) PublishSimple(message string) {
	// 1. 申请队列,如果队列不存在会自动创建,如果存在则跳过,保证一定可以发送到队列中
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		false, // 是否持久化
		false,
		false, // 是否排他
		false, // 是否阻塞
		nil,   // 额外属性
	)
	if err != nil {
		fmt.Println(err.Error())
	}
	// 2. 发送消息到队列中
	err = r.channel.Publish(
		r.Exchange,
		r.QueueName,
		false, // 是否返还给发送者
		false, // 如果为true，当exchange发送消息到队列后发现队列上没有消费者，则会把消息返还给发送者
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	if err != nil {
		fmt.Println(err)
	}
}

// 简单模式: 消费者
func (r *RabbitMQ) ConsumeSimple() {
	// 1. 申请队列,如果队列不存在会自动创建,如果存在则跳过,保证一定可以发送到队列中
	_, err := r.channel.QueueDeclare(
		r.QueueName,
		false, // 是否持久化
		false,
		false, // 是否排他
		false, // 是否阻塞
		nil,   // 额外属性
	)
	if err != nil {
		fmt.Println(err.Error())
	}
	// 2. 发送消息到队列中
	msgs, err := r.channel.Consume(
		r.QueueName,
		"",    //区分多个消费者
		true,  // 是否自动应答
		false, // 是否有排他性
		false, // 如果设置为true,不能将同一个connection中发送的消息传递给这个connection中的其他消费者
		false,
		nil,
	)
	if err != nil {
		fmt.Println(err.Error())
	}
	forever := make(chan bool)
	go func() {
		for d := range msgs {
			// 实现要处理的逻辑函数
			log.Printf("Receive message: %s\n", d.Body)
		}
	}()
	log.Printf("[*] Waiting for message, To exit press CTRL+C\n")
	<-forever
}

// 订阅模式下的RabbitMQ实例
func NewRabbitMQPubSub(exchangeName string) *RabbitMQ {
	// 创建rabbitMQ实例
	rabbitmq := NewRabbitMQ("", exchangeName, "")
	var err error
	// 获取connection
	rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
	rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
	// 获取channel
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel!")
	return rabbitmq
}

// 订阅模式生产者
func (r *RabbitMQ) PublishPub(message string) {
	// 1. 尝试创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		amqp.ExchangeFanout,
		true,
		false,
		false,
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an exchange")
	// 2. 广播消息
	err = r.channel.Publish(
		r.Exchange,
		"",
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
	if err != nil {
		log.Println(err.Error())
	}
}

// 订阅模式消费者
func (r *RabbitMQ) ReceiveSub() {
	// 1. 尝试获取交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		amqp.ExchangeFanout,
		true,
		false,
		false,
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare an exchange!")

	// 2. 试探性创建队列,注意队列名称
	q, err := r.channel.QueueDeclare(
		"", // 名字不写,随机生产队列名称
		false,
		false,
		true,
		false,
		nil,
	)
	r.failOnErr(err, "failed to declare a queue")

	// 3. 队列绑定到交换机
	err = r.channel.QueueBind(
		q.Name,
		"",
		r.Exchange,
		false,
		nil)
	r.failOnErr(err, "failed to bind a queue to exchange")

	// 4. 消费消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil)

	forever := make(chan bool)
	go func() {
		for d := range message {
			// 实现要处理的逻辑函数
			log.Printf("Receive message: %s\n", d.Body)
		}
	}()
	log.Printf("[*] Waiting for message, To exit press CTRL+C\n")
	<-forever
}

// 创建路由模式RabbitMQ实例对象
func NewRabbitMQRouting(exchangeName string, routingKey string) *RabbitMQ {
	// 创建rabbitMQ实例对象
	rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
	var err error
	// 获取connection
	rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
	rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
	// 获取channel
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel")
	return rabbitmq
}

// 路由模式发送消息
func (r *RabbitMQ) PublishRouting(message string) {
	// 1. 尝试创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil)
	r.failOnErr(err, "Failed to declare an exchange")
	// 2. 发送消息
	r.channel.Publish(
		r.Exchange,
		r.Key,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
}

// 路由模式消费消息
func (r *RabbitMQ) ReceiveRouting() {
	// 1. 尝试获取交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		amqp.ExchangeDirect,
		true,
		false,
		false,
		false,
		nil)
	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 尝试获取队列
	q, err := r.channel.QueueDeclare(
		"",
		false,
		false,
		true,
		false,
		nil)
	r.failOnErr(err, "Failed to get a queue")

	// 3. 为队列绑定交换机
	err = r.channel.QueueBind(
		q.Name,
		r.Key,
		r.Exchange,
		false,
		nil)
	r.failOnErr(err, "Failed to bind a exchange")

	// 4. 消费消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil)

	forever := make(chan bool)
	go func() {
		for d := range message {
			// 实现要处理的逻辑函数
			log.Printf("Receive message: %s\n", d.Body)
		}
	}()
	log.Printf("[*] Waiting for message, To exit press CTRL+C\n")
	<-forever
}


// 创建路由模式RabbitMQ实例对象
func NewRabbitMQTopic(exchangeName string, routingKey string) *RabbitMQ {
	// 创建rabbitMQ实例对象
	rabbitmq := NewRabbitMQ("", exchangeName, routingKey)
	var err error
	// 获取connection
	rabbitmq.conn, err = amqp.Dial(rabbitmq.MqUrl)
	rabbitmq.failOnErr(err, "failed to connect rabbitmq!")
	// 获取channel
	rabbitmq.channel, err = rabbitmq.conn.Channel()
	rabbitmq.failOnErr(err, "failed to open a channel")
	return rabbitmq
}

// Topic模式发送消息
func (r *RabbitMQ) PublishTopic(message string) {
	// 1. 尝试创建交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		amqp.ExchangeTopic,
		true,
		false,
		false,
		false,
		nil)
	r.failOnErr(err, "Failed to declare an exchange")
	// 2. 发送消息
	r.channel.Publish(
		r.Exchange,
		r.Key,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(message),
		})
}

// topic模式消费消息
func (r *RabbitMQ) ReceiveTopic() {
	// 1. 尝试获取交换机
	err := r.channel.ExchangeDeclare(
		r.Exchange,
		amqp.ExchangeTopic,
		true,
		false,
		false,
		false,
		nil)
	r.failOnErr(err, "Failed to declare an exchange")

	// 2. 尝试获取队列
	q, err := r.channel.QueueDeclare(
		"",
		false,
		false,
		true,
		false,
		nil)
	r.failOnErr(err, "Failed to get a queue")

	// 3. 为队列绑定交换机
	err = r.channel.QueueBind(
		q.Name,
		r.Key,
		r.Exchange,
		false,
		nil)
	r.failOnErr(err, "Failed to bind a exchange")

	// 4. 消费消息
	message, err := r.channel.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil)

	forever := make(chan bool)
	go func() {
		for d := range message {
			// 实现要处理的逻辑函数
			log.Printf("Receive message: %s\n", d.Body)
		}
	}()
	log.Printf("[*] Waiting for message, To exit press CTRL+C\n")
	<-forever
}